package org.example.dataToES

import java.text.SimpleDateFormat
import java.util.Date

import org.example.constant.ApolloConst
import com.alibaba.fastjson.{JSON, JSONObject}
import org.example.common.{Logging, Sparking}
import org.example.utils.{CommonUtils, ZkManager}
import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.elasticsearch.spark.rdd.EsSpark
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat

import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, Map}

/**
 * 企业数据写入es，无逻辑操作
 * gps数据每天建一个索引加别名 /opt/software/spark-project/project-script/data2es/createGPSIndex.sh
 * 项目脚本位置 /opt/software/project-jgd/project-script/data-warehouse/RealDataToEs.sh
 *
 */
object RealDataToEs extends Sparking with Logging with Serializable {

  val warnTypeMap: Map[String, String] = Map(
    "0x001" -> "超速报警",        "0x0011" -> "驾驶员异常报警",                 "0x00119" -> "长期异地经营报警",
    "0x002" -> "疲劳驾驶报警",    "0x0012" -> "碰撞报警",                       "0x00120" -> "凌晨2时至5时行车报警",
    "0x003" -> "紧急报警",        "0x0013" -> "频繁变道报警",                   "0x00121" -> "超时疲劳驾驶报警",
    "0x004" -> "进入指定区域报警", "0x0014" -> "胎压报警",                      "0x00122" -> "累计驾驶超时报警",
    "0x005" -> "离开指定区域报警", "0x0101" -> "侧翻报警",                      "0x00123" -> "离线位移报警",
    "0x006" -> "路段堵塞报警",    "0x00102" -> "前向碰撞预警",                   "0x00124" -> "超过三天车辆无上线报警",
    "0x007" -> "危险路段报警",    "0x00103" -> "车道偏离预警",                   "0x00125" -> "驾驶员主动报警",
    "0x008" -> "越界报警",        "0x00104" -> "车距过近预警",                   "0x00126" -> "区间行驶时间不足",
    "0x009" -> "盗警",            "0x00111" -> "胎压异常报警（胎压不平衡报警）",  "0x00127" -> "区间行驶时间过长",
    "0x00a" -> "劫警",            "0x00112" -> "胎压异常报警（慢漏气报警）",      "0x00128" -> "电子围栏",
    "0x00b" -> "偏离路线报警",     "0x00113" -> "胎压异常报警（电池电量低报警）",  "0x00129" -> "存储故障报警",
    "0x00c" -> "车辆移动报警",     "0x00114" -> "频繁变道预警",                  "0x00130" -> "GNSS 模块发生故障",
    "0x00d" -> "超时驾驶报警",     "0x00115" -> "急刹车",                        "0x00131" -> "GNSS 天线未接或被剪断",
    "0x00e" -> "接打电话报警",     "0x00116" -> "怠速停车",                      "0x00132" -> "GNSS 天线短路",
    "0x00f" -> "抽烟报警",        "0x00117" -> "低档高速",                      "0x00133" -> "终端主电源欠压",
    "0x0010" -> "分神报警",        "0x00118" -> "空挡滑行",                      "0x00134" -> "终端主电源掉电",
    "0x00105" -> "行人碰撞预警",   "0x00106" -> "生理疲劳驾驶预警",              "0x00107" -> "分神驾驶报警",
    "0x00134" -> "终端主电源掉电", "0x00137" -> "摄像头故障",                    "0x00122" -> "累计驾驶超时报警",
    "0x00ff" -> "其他报警")

  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = CommonUtils.getSparkSession()
    val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(60))
    val zkManager = ZkManager(ApolloConst.zkKafka)
    val kafkaParams = getKafkaParams(ApolloConst.bootstrap, "allDataGps3.2")
    val offsets = zkManager.getBeginOffset(ApolloConst.GPS_TOPICS, "allDataGps3.2")
    val inputStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](ApolloConst.GPS_TOPICS, kafkaParams, offsets))
    val offsetRanges = new ArrayBuffer[OffsetRange]()
    inputStream.transform { rdd =>
      offsetRanges.clear()
      offsetRanges.append(rdd.asInstanceOf[HasOffsetRanges].offsetRanges: _*)
      rdd
    }.map(x => x.value()).foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        //数据分类
        val rddClass = rdd.map { row =>
          val off: OffsetRange = offsetRanges(TaskContext.get.partitionId)
          if (off.topic.equals("UP_EXG_MSG_REAL_LOCATION")) {
            (0, row)
          } else if (off.topic.equals("UP_EXG_MSG_REGISTER")) {
            (1, row)
          } else if (off.topic.equals("UP_WARN_MSG_ADPT_INFO")) {
            (2, row)
          } else if (off.topic.equals("DOWN_LINK_TEST_RSP")) {
            (3, row)
          } else if (off.topic.equals("DOWN_BASE_MSG_VEHICLE_ADDED")) {
            (4, row)
          } else if (off.topic.equals("DOWN_CONNECT_RSP")) {
            (5, row)
          } else if (off.topic.equals("UP_CONNECT_REQ")) {
            (6, row)
          }
          else {
            (7, row)
          }
        }
        rddClass.cache()
        //缓存
        //筛选数据,车辆实时定位信息
        val locationRdd = rddClass.filter(x => x._1 == 0).map(x => x._2)
        //车辆注册信息
        val registerRdd = rddClass.filter(x => x._1 == 1).map(x => x._2)
        //车辆报警数据
        val warnRdd = rddClass.filter(x => x._1 == 2).map(x => x._2)
        //从链路应答
        val downLinkRdd = rddClass.filter(x => x._1 == 3).map(x => x._2)
        //车辆信息
        val carAdd = rddClass.filter((x => x._1 == 4)).map(x => x._2)
        //从链路连接
        val downConnectRdd = rddClass.filter(x => x._1 == 5).map(x => x._2)
        //主链路连接
        val upConnectRdd = rddClass.filter(x => x._1 == 6).map(x => x._2)
        //处理数据
        locationData2Es(locationRdd)
        registerVehicles(registerRdd)
        warnData2Es(warnRdd)
        downLinkData2Es(downLinkRdd)
        vehicleAdd(carAdd)
        downConnect(downConnectRdd)
        upConnect(upConnectRdd)
        zkManager.saveEndOffset(offsetRanges, "allDataGps3.2")
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }

  /**
   * gps定位数据写入es
   *
   * @param rdd
   * @return
   */
  def locationData2Es(rdd: RDD[String]) = {
    val newRdd = rdd.map { x =>
      var data: JSONObject = null
      try {
        data = JSON.parseObject(x)
      } catch {
        case e: Exception => {
          error(s"该条数据格式错误:${x}")
        }
      }
      data
    }.filter(x => null != x).cache()

    val value = newRdd.mapPartitions(partition => {
      var esMap = mutable.HashMap[String, String]()
      val warnRDD: Iterator[mutable.HashMap[String, String]] = partition.map(messageInfo => {
        if (messageInfo.containsKey("dataType")) {
          if (messageInfo.getString("dataType").equals("0x1202") && messageInfo.getString("lat") != "0" && messageInfo.getString("lon") != "0") {

            val vehicleNo: String = messageInfo.getString("vehicleNo")
            val vehicleColor: String = messageInfo.getString("vehicleColor")
            val appId = messageInfo.getString("appId")
            val alarm = messageInfo.getString("alarm")
            val businessTime = messageInfo.getString("businessTime")
            val encrypt = messageInfo.getString("encrypt")
            val primaryKey = vehicleNo + "#" + vehicleColor
            val vec3 = messageInfo.getString("vec3")
            val dateTime: String = messageInfo.getString("dateTime")
            val latitude = messageInfo.getString("lat")
            val longitude = messageInfo.getString("lon")
            val direction = messageInfo.getString("direction")
            val altitude = messageInfo.getString("altitude")
            val vec1: String = messageInfo.getString("vec1")
            //行驶记录速度，指车辆行驶记录设备上传的行车速度信息，单位 为千米每小时(km/ h)
            val vec2: String = messageInfo.getString("vec2")
            val times: DateTime = DateTime.parse(dateTime, DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"))
            val warnTime: String = times.toDate.getTime.toString
            val gpsId = primaryKey + "#" + warnTime
            esMap += (("id" -> gpsId))
            esMap += ("vehicleNo" -> vehicleNo)
            esMap += ("alarm" -> alarm)
            esMap += ("businessTime" -> businessTime)
            esMap += ("encrypt" -> encrypt)
            esMap += ("vehicleColor" -> vehicleColor)
            esMap += ("appId" -> appId)
            esMap += (("direction" -> direction))
            esMap += (("datetime" -> warnTime))
            esMap += (("vec1" -> vec1))
            esMap += (("vec2" -> vec2))
            esMap += (("primaryKey" -> primaryKey))
            esMap += (("altitude" -> altitude))
            esMap += (("longitude" -> longitude))
            esMap += (("latitude" -> latitude))
          }
        }
        esMap
      })
      warnRDD
    })
    val flagNow: Date = new Date()
    val flagDateFormat: SimpleDateFormat = new SimpleDateFormat("yyyyMMdd")
    val indexTime = flagDateFormat.format(flagNow)
    val source = "twodays_gps_index" + indexTime + "/twodays_gps_type"
    EsSpark.saveToEs(value.filter(x => !x.isEmpty), source, Map("es.mapping.id" -> "id"))
  }

  /**
   *
   * 报警数据写入es
   *
   * @param rdd
   *
   */
  def warnData2Es(rdd: RDD[String]): Unit = {
    val newRdd = rdd.map { x =>
      var data: JSONObject = null
      try {
        data = JSON.parseObject(x)
      } catch {
        case e: Exception => {
          error(s"该条数据格式错误:${x}")
        }
      }
      data
    }.filter(x => null != x)
    val value = newRdd.mapPartitions(partition => {
      var esMap = mutable.HashMap[String, String]()
      val warnRDD: Iterator[mutable.HashMap[String, String]] = partition.map(messageInfo => {
        if (messageInfo.containsKey("dataType")) {
          val dataType = messageInfo.getString("dataType")
          val vehicleNo: String = messageInfo.getString("vehicleNo")
          val vehicleColor: String = messageInfo.getString("vehicleColor")
          val appId = messageInfo.getString("appId")
          val primaryKey = vehicleNo + "#" + vehicleColor
          val businessTime: String = messageInfo.getString("businessTime")
          val warnType = messageInfo.getString("warnType")
          //val infoContent = messageInfo.getString("infoContent")
          val infoContent = if(warnTypeMap.contains(warnType)) warnTypeMap(warnType) else "报警类型未匹配-" + warnType
          val infoId = messageInfo.getString("infoId")
          val msgId = messageInfo.getString("msgId")
          val warnSrc = messageInfo.getString("warnSrc")
          val warnTime: String = messageInfo.getString("warnTime")
          val times: DateTime = DateTime.parse(businessTime, DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"))
          val signTime: String = times.toDate.getTime.toString
          val gpsId = primaryKey + "#" + signTime
          esMap += (("id" -> gpsId))
          esMap += ("vehicleNo" -> vehicleNo)
          esMap += ("vehicleColor" -> vehicleColor)
          esMap += ("appId" -> appId)
          esMap += ("dataType" -> dataType)
          esMap += (("businessTime" -> businessTime))
          esMap += (("primaryKey" -> primaryKey))
          esMap += (("infoContent" -> infoContent))
          esMap += (("infoId" -> infoId))
          esMap += (("msgId" -> msgId))
          esMap += (("warnSrc" -> warnSrc))
          esMap += (("warnTime" -> warnTime))
          esMap += (("warnType" -> warnType))


        }
        esMap
      })
      warnRDD
    })
    val source = "up_warn_msg_adpt_info_index" + "/_doc"
    EsSpark.saveToEs(value.filter(x => !x.isEmpty), source, Map("es.mapping.id" -> "id"))
  }

  /**
   * 车辆注册信息写入es
   *
   * @param rdd
   * @return
   */
  def registerVehicles(rdd: RDD[String]) = {
    val newRdd = rdd.map { x =>
      var data: JSONObject = null
      try {
        data = JSON.parseObject(x)
      } catch {
        case e: Exception => {
          error(s"该条数据格式错误:${x}")
        }
      }
      data
    }.filter(x => null != x)
    val value = newRdd.mapPartitions(partition => {
      var esMap = mutable.HashMap[String, String]()
      val warnRDD: Iterator[mutable.HashMap[String, String]] = partition.map(messageInfo => {
        if (messageInfo.containsKey("dataType")) {


          val vehicleNo: String = messageInfo.getString("vehicleNo")
          val vehicleColor: String = messageInfo.getString("vehicleColor")
          val appId = messageInfo.getString("appId")
          val primaryKey = vehicleNo + "#" + vehicleColor
          val businessTime: String = messageInfo.getString("businessTime")
          val platformId = messageInfo.getString("platformId")
          val producerId = messageInfo.getString("producerId")
          val terminalModeType = messageInfo.getString("terminalModeType")
          val terminalId = messageInfo.getString("terminalId")
          val terminalSimCode: String = messageInfo.getString("terminalSimCode")
          val times: DateTime = DateTime.parse(businessTime, DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"))
          val warnTime: String = times.toDate.getTime.toString
          val gpsId = primaryKey + "#" + warnTime
          esMap += (("id" -> gpsId))
          esMap += ("vehicleNo" -> vehicleNo)
          esMap += ("vehicleColor" -> vehicleColor)
          esMap += ("appId" -> appId)
          esMap += (("platformId" -> platformId))
          esMap += (("businessTime" -> businessTime))
          esMap += (("producerId" -> producerId))
          esMap += (("primaryKey" -> primaryKey))
          esMap += (("terminalModeType" -> terminalModeType))
          esMap += (("terminalId" -> terminalId))
          esMap += (("terminalSimCode" -> terminalSimCode))
        }
        esMap
      })
      warnRDD
    })
    val source = "up_exg_msg_register_index" + "/_doc"
    EsSpark.saveToEs(value.filter(x => !x.isEmpty), source, Map("es.mapping.id" -> "id"))
  }

  /**
   * 从链路数据写入es
   *
   * @param rdd
   * @return
   */
  def downLinkData2Es(rdd: RDD[String]) = {
    val newRdd = rdd.map { x =>
      var data: JSONObject = null
      try {
        data = JSON.parseObject(x)
      } catch {
        case e: Exception => {
          error(s"该条数据格式错误:${x}")
        }
      }
      data
    }.filter(x => null != x)
    val value = newRdd.mapPartitions(partition => {
      var esMap = mutable.HashMap[String, String]()
      val warnRDD: Iterator[mutable.HashMap[String, String]] = partition.map(messageInfo => {
        val appId = messageInfo.getString("appId")
        val msgId = messageInfo.getString("msgId")
        val businessTime: String = messageInfo.getString("businessTime")
        val times: DateTime = DateTime.parse(businessTime, DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"))
        val warnTime: String = times.toDate.getTime.toString
        esMap += (("id" -> warnTime))
        esMap += ("appId" -> appId)
        esMap += ("businessTime" -> businessTime)
        esMap += ("msgId" -> msgId)
        esMap
      })
      warnRDD
    })
    val source = "down_link_test_rsp_index" + "/_doc"
    EsSpark.saveToEs(value.filter(x => !x.isEmpty), source, Map("es.mapping.id" -> "id"))
  }

  /**
   * 从车辆新增数据写入es
   *
   * @param rdd
   * @return
   */
  def vehicleAdd(rdd: RDD[String]): Unit = {
    val newRdd = rdd.map { x =>
      var data: JSONObject = null
      try {
        data = JSON.parseObject(x)
      } catch {
        case e: Exception => {
          error(s"该条数据格式错误:${x}")
        }
      }
      data
    }.filter(x => null != x)
    val value = newRdd.mapPartitions(partition => {
      var esMap = mutable.HashMap[String, String]()
      val warnRDD: Iterator[mutable.HashMap[String, String]] = partition.map(messageInfo => {
        val appId = messageInfo.getString("appId")
        val msgId = messageInfo.getString("msgId")
        val businessTime: String = messageInfo.getString("businessTime")
        val dataType = messageInfo.getString("dataType")
        val vehicleColor: String = messageInfo.getString("vehicleColor")
        val vehicleNo: String = messageInfo.getString("vehicleNo")
        val times: DateTime = DateTime.parse(businessTime, DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"))
        val warnTime: String = times.toDate.getTime.toString
        val id = vehicleNo + "_" + vehicleNo
        esMap += (("id" -> id))
        esMap += ("appId" -> appId)
        esMap += ("businessTime" -> businessTime)
        esMap += ("msgId" -> msgId)
        esMap += ("dataType" -> dataType)
        esMap += ("vehicleColor" -> vehicleColor)
        esMap += ("vehicleNo" -> vehicleNo)
        esMap
      })
      warnRDD
    })
    val source = "down_base_msg_vehicle_added_index" + "/_doc"
    EsSpark.saveToEs(value.filter(x => !x.isEmpty), source, Map("es.mapping.id" -> "id"))
  }

  /**
   * down端连接数据写入es
   *
   * @param rdd
   * @return
   */
  def downConnect(rdd: RDD[String]): Unit = {
    val newRdd = rdd.map { x =>
      var data: JSONObject = null
      try {
        data = JSON.parseObject(x)
      } catch {
        case e: Exception => {
          error(s"该条数据格式错误:${x}")
        }
      }
      data
    }.filter(x => null != x)
    val value = newRdd.mapPartitions(partition => {
      var esMap = mutable.HashMap[String, String]()
      val warnRDD: Iterator[mutable.HashMap[String, String]] = partition.map(messageInfo => {
        val appId = messageInfo.getString("appId")
        val msgId = messageInfo.getString("msgId")
        val businessTime: String = messageInfo.getString("businessTime")
        val result = messageInfo.getString("result")
        val times: DateTime = DateTime.parse(businessTime, DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"))
        val warnTime: String = times.toDate.getTime.toString
        esMap += (("id" -> warnTime))
        esMap += ("appId" -> appId)
        esMap += ("result" -> result)
        esMap += ("businessTime" -> businessTime)
        esMap += ("msgId" -> msgId)
        esMap
      })
      warnRDD
    })
    val source = "down_connect_rsp" + "/_doc"
    EsSpark.saveToEs(value.filter(x => !x.isEmpty), source, Map("es.mapping.id" -> "id"))
  }

  /**
   * up端数据写入es
   *
   * @param rdd
   * @return
   */
  def upConnect(rdd: RDD[String]): Unit = {
    val newRdd = rdd.map { x =>
      var data: JSONObject = null
      try {
        data = JSON.parseObject(x)
      } catch {
        case e: Exception => {
          error(s"该条数据格式错误:${x}")
        }
      }
      data
    }.filter(x => null != x)
    val value = newRdd.mapPartitions(partition => {
      var esMap = mutable.HashMap[String, String]()
      val warnRDD: Iterator[mutable.HashMap[String, String]] = partition.map(messageInfo => {
        val appId = messageInfo.getString("appId")
        val msgId = messageInfo.getString("msgId")
        val businessTime: String = messageInfo.getString("businessTime")
        val result = messageInfo.getString("result")
        val times: DateTime = DateTime.parse(businessTime, DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"))
        val warnTime: String = times.toDate.getTime.toString
        esMap += (("id" -> warnTime))
        esMap += ("appId" -> appId)
        esMap += ("result" -> result)
        esMap += ("businessTime" -> businessTime)
        esMap += ("msgId" -> msgId)
        esMap
      })
      warnRDD
    })
    val source = "up_connect_rsp" + "/_doc"
    EsSpark.saveToEs(value.filter(x => !x.isEmpty), source, Map("es.mapping.id" -> "id"))
  }
}
